Analyzing US Flights with Cloudera CDH and Sparklyr

Demo source

 Author: Aki Ariga
 R Pubs doc: https://rpubs.com/chezou/usflights-en


Data sources

Raw data sources

Using raw data to create Hive and Impala tables

In order to use the data above, download it and place it into HDFS or S3.

Then create the table definitions.

Warning about data tables

As of dplyr_0.5.0.9000, the tbl() function does not deal gracefully with Hive tables in a schema other than default. The recommended workaround is to use the sql function as a parameter, something like

airports <- tbl(sc, sql("SELECT * FROM airlines.airports"))

However, the type of dataframe created from the above differs somehow, so that the gcIntermediate function used below fails. So the current assumption is that the airlines and airports tables are in the default database.

USE default;
CREATE EXTERNAL TABLE IF NOT EXISTS airlines 
LIKE PARQUET '/user/centos/data/airlines_parquet/4345e5eef217aa1b-c8f16177f35fd983_1150363067_data.0.parq' 
STORED AS PARQUET LOCATION '/user/centos/data/airlines_parquet';

CREATE EXTERNAL TABLE IF NOT EXISTS airports_raw 
(iata STRING, airport STRING, city STRING, state STRING, country STRING, latitude FLOAT, longitude FLOAT ) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS TEXTFILE 
LOCATION 'hdfs:///user/centos/data/airports' 
TBLPROPERTIES (skip.header.line.count=1);

create table airports STORED AS PARQUET AS
select regexp_replace(iata,'"','') AS iata,
regexp_replace(airport, '"', '') AS airport,
regexp_replace(city, '"', '') AS city,
regexp_replace(state, '"', '') AS state,
regexp_replace(country, '"', '') AS country,
latitude,
longitude
FROM airports_raw;
--- WHERE iata != '"iata"'

Load R libraries


In [ ]:
# Load libraries
library(ggplot2)
library(maps)
library(geosphere)
library(sparklyr)
library(dplyr);

Establish Spark connection

Adjust the parameters for the cluster you are running on. If local, use master="local"

I explicitly set sparklyr.gateway.port here because I will often have a notebook already running on the default port of 8880


In [ ]:
# Configure cluster
config <- spark_config()
config$spark.driver.cores   <- 4
config$spark.executor.cores <- 4
config$spark.executor.memory <- "4G"
config$sparklyr.gateway.port = 8877
#spark_home <- "/opt/cloudera/parcels/CDH/lib/spark"
#spark_version <- "1.6.2"
spark_home <- "/opt/cloudera/parcels/SPARK2/lib/spark2"
spark_version <- "2.0.0"
sc <- spark_connect(master="yarn-client", version=spark_version, config=config, spark_home=spark_home)

In [ ]:
# airlines <- tbl(src=sc, "airlines")
# airlines <- tbl(src=sc, sql("select * FROM airlines.airlines"))
airlines <- tbl(src=sc, "airlines")

head(airlines)

In [ ]:
airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collect
airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))

In [ ]:
g <- ggplot(airline_counts_by_year, aes(x=year, y=count))
g <- g + geom_line(
  colour = "magenta",
  linetype = 1,
  size = 0.8
)
g <- g + xlab("Year")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)

In [ ]:
airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect

g <- ggplot(
  airline_counts_by_month, 
  aes(x=as.Date(sprintf("%d-%02d-01", airline_counts_by_month$year, airline_counts_by_month$month)), y=count)
  )
g <- g + geom_line(
  colour = "magenta",
  linetype = 1,
  size = 0.8
)
g <- g + xlab("Year/Month")
g <- g + ylab("Flight number")
g <- g + ggtitle("US flights")
plot(g)

In [ ]:
flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collect
head(flights)

In [ ]:
# airports <- tbl(sc, sql("select * FROM airlines.airports")) %>% collect
airports <- tbl(sc, "airports") %>% collect
## Now we extract AA’s flight in 2007.

flights_aa <- flights %>% filter(year==2007) %>% filter(carrier=="AA") %>% arrange(count)
head(flights_aa)

Visualize flight paths

Flight visualization code is taken from this article: http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/


In [ ]:
# draw map with line of AA
xlim <- c(-171.738281, -56.601563)
ylim <- c(12.039321, 71.856229)

# Color settings
#require('RColorBrewer')
#colors <- brewer.pal(11,'RdYlBu')
pal <- colorRampPalette(c("#333333", "white", "#1292db"))
colors <- pal(100)

map("world", col="#6B6363", fill=TRUE, bg="#000000", lwd=0.05, xlim=xlim, ylim=ylim)

maxcnt <- max(flights_aa$count)

for (j in 1:length(flights_aa$carrier)) {
  air1 <- airports[airports$iata == flights_aa[j,]$origin,]
  air2 <- airports[airports$iata == flights_aa[j,]$dest,]  
  inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)
  colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) )
  
 lines(inter, col=colors[colindex], lwd=0.8)
}

In [ ]: